| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593 |
2x
2x
2x
2x
2x
2x
2x
2x
2x
350x
350x
350x
2x
350x
350x
350x
350x
350x
350x
350x
350x
339x
350x
350x
1x
1x
1x
1x
349x
2x
353x
353x
21x
21x
16x
21x
9x
12x
12x
353x
2x
303x
303x
303x
159x
159x
303x
2x
2x
2x
379x
2x
271x
271x
271x
271x
271x
2x
310x
2x
165x
165x
394x
394x
394x
394x
394x
394x
394x
426x
426x
426x
394x
394x
45x
45x
45x
1169x
1169x
1169x
1169x
323x
317x
317x
323x
1169x
28x
28x
28x
47x
13x
13x
13x
51x
1164x
1164x
1164x
1164x
1164x
643x
643x
643x
192x
192x
451x
451x
451x
451x
1164x
119x
119x
119x
119x
119x
119x
119x
119x
21x
21x
21x
2x
2x
19x
3x
16x
119x
119x
119x
14x
14x
14x
14x
119x
2x
320x
320x
320x
332x
332x
332x
332x
332x
332x
332x
332x
349x
349x
349x
349x
97x
320x
298x
298x
298x
157x
141x
141x
141x
2x
2x
2x
2x
141x
2x
114x
68x
68x
68x
68x
68x
68x
28x
28x
5x
28x
68x
2x
2699x
2x
436x
436x
3090x
2238x
787x
6115x
6115x
| /**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { User } from '../auth/user';
import { Query } from '../core/query';
import { Timestamp } from '../core/timestamp';
import { BatchId, ProtoByteString } from '../core/types';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch';
import { ResourcePath } from '../model/path';
import { assert, fail } from '../util/assert';
import { immediatePredecessor, primitiveComparator } from '../util/misc';
import { SortedSet } from '../util/sorted_set';
import * as EncodedResourcePath from './encoded_resource_path';
import { GarbageCollector } from './garbage_collector';
import {
DbDocumentMutation,
DbDocumentMutationKey,
DbMutationBatch,
DbMutationBatchKey,
DbMutationQueue,
DbMutationQueueKey
} from './indexeddb_schema';
import { LocalSerializer } from './local_serializer';
import { MutationQueue } from './mutation_queue';
import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { SimpleDbStore, SimpleDbTransaction } from './simple_db';
/** A mutation queue for a specific user, backed by IndexedDB. */
export class IndexedDbMutationQueue implements MutationQueue {
/**
* Next value to use when assigning sequential IDs to each mutation batch.
*
* NOTE: There can only be one IndexedDbMutationQueue for a given db at a
* time, hence it is safe to track nextBatchID as an instance-level property.
* Should we ever relax this constraint we'll need to revisit this.
*/
private nextBatchId: BatchId;
/**
* A write-through cache copy of the metadata describing the current queue.
*/
private metadata: DbMutationQueue;
private garbageCollector: GarbageCollector | null = null;
constructor(
/**
* The normalized userId (e.g. null UID => "" userId) used to store /
* retrieve mutations.
*/
private userId: string,
private serializer: LocalSerializer
) {}
/**
* Creates a new mutation queue for the given user.
* @param user The user for which to create a mutation queue.
* @param serializer The serializer to use when persisting to IndexedDb.
*/
static forUser(user: User, serializer: LocalSerializer) {
// TODO(mcg): Figure out what constraints there are on userIDs
// In particular, are there any reserved characters? are empty ids allowed?
// For the moment store these together in the same mutations table assuming
// that empty userIDs aren't allowed.
assert(user.uid !== '', 'UserID must not be an empty string.');
const userId = user.isAuthenticated() ? user.uid! : '';
return new IndexedDbMutationQueue(userId, serializer);
}
start(transaction: PersistenceTransaction): PersistencePromise<void> {
return IndexedDbMutationQueue.loadNextBatchIdFromDb(transaction)
.next(nextBatchId => {
this.nextBatchId = nextBatchId;
return mutationQueuesStore(transaction).get(this.userId);
})
.next((metadata: DbMutationQueue | null) => {
if (!metadata) {
metadata = new DbMutationQueue(
this.userId,
BATCHID_UNKNOWN,
/*lastStreamToken=*/ ''
);
}
this.metadata = metadata;
// On restart, nextBatchId may end up lower than
// lastAcknowledgedBatchId since it's computed from the queue
// contents, and there may be no mutations in the queue. In this
// case, we need to reset lastAcknowledgedBatchId (which is safe
// since the queue must be empty).
if (this.metadata.lastAcknowledgedBatchId >= this.nextBatchId) {
return this.checkEmpty(transaction).next(empty => {
assert(
empty,
'Reset nextBatchID is only possible when the queue is empty'
);
this.metadata.lastAcknowledgedBatchId = BATCHID_UNKNOWN;
return mutationQueuesStore(transaction).put(this.metadata);
});
} else {
return PersistencePromise.resolve();
}
});
}
/**
* Returns one larger than the largest batch ID that has been stored. If there
* are no mutations returns 0. Note that batch IDs are global.
*/
static loadNextBatchIdFromDb(
txn: PersistenceTransaction
): PersistencePromise<BatchId> {
let maxBatchId = BATCHID_UNKNOWN;
return mutationsStore(txn)
.iterate({ reverse: true }, (key, batch, control) => {
const [userId, batchId] = key;
if (batchId > maxBatchId) {
maxBatchId = batch.batchId;
}
if (userId === '') {
// We can't compute a predecessor for the empty string, since it
// is lexographically first. That also means that no other
// userIds can come before this one, so we can just exit early.
control.done();
} else {
const nextUser = immediatePredecessor(userId);
control.skip([nextUser]);
}
})
.next(() => maxBatchId + 1);
}
checkEmpty(transaction: PersistenceTransaction): PersistencePromise<boolean> {
let empty = true;
const range = IDBKeyRange.bound(
this.keyForBatchId(Number.NEGATIVE_INFINITY),
this.keyForBatchId(Number.POSITIVE_INFINITY)
);
return mutationsStore(transaction)
.iterate({ range }, (key, value, control) => {
empty = false;
control.done();
})
.next(() => empty);
}
getNextBatchId(
transaction: PersistenceTransaction
): PersistencePromise<BatchId> {
return PersistencePromise.resolve(this.nextBatchId);
}
getHighestAcknowledgedBatchId(
transaction: PersistenceTransaction
): PersistencePromise<BatchId> {
return PersistencePromise.resolve(this.metadata.lastAcknowledgedBatchId);
}
acknowledgeBatch(
transaction: PersistenceTransaction,
batch: MutationBatch,
streamToken: ProtoByteString
): PersistencePromise<void> {
const batchId = batch.batchId;
assert(
batchId > this.metadata.lastAcknowledgedBatchId,
'Mutation batchIDs must be acknowledged in order'
);
this.metadata.lastAcknowledgedBatchId = batchId;
this.metadata.lastStreamToken = validateStreamToken(streamToken);
return mutationQueuesStore(transaction).put(this.metadata);
}
getLastStreamToken(
transaction: PersistenceTransaction
): PersistencePromise<ProtoByteString> {
return PersistencePromise.resolve(this.metadata.lastStreamToken);
}
setLastStreamToken(
transaction: PersistenceTransaction,
streamToken: ProtoByteString
): PersistencePromise<void> {
this.metadata.lastStreamToken = validateStreamToken(streamToken);
return mutationQueuesStore(transaction).put(this.metadata);
}
addMutationBatch(
transaction: PersistenceTransaction,
localWriteTime: Timestamp,
mutations: Mutation[]
): PersistencePromise<MutationBatch> {
const batchId = this.nextBatchId;
this.nextBatchId++;
const batch = new MutationBatch(batchId, localWriteTime, mutations);
const dbBatch = this.serializer.toDbMutationBatch(this.userId, batch);
return mutationsStore(transaction)
.put(dbBatch)
.next(() => {
const promises: Array<PersistencePromise<void>> = [];
for (const mutation of mutations) {
const indexKey = DbDocumentMutation.key(
this.userId,
mutation.key.path,
batchId
);
documentMutationsStore(transaction).put(
indexKey,
DbDocumentMutation.PLACEHOLDER
);
}
return PersistencePromise.waitFor(promises);
})
.next(() => {
return batch;
});
}
lookupMutationBatch(
transaction: PersistenceTransaction,
batchId: BatchId
): PersistencePromise<MutationBatch | null> {
return mutationsStore(transaction)
.get(this.keyForBatchId(batchId))
.next(
dbBatch =>
dbBatch ? this.serializer.fromDbMutationBatch(dbBatch) : null
);
}
getNextMutationBatchAfterBatchId(
transaction: PersistenceTransaction,
batchId: BatchId
): PersistencePromise<MutationBatch | null> {
const range = IDBKeyRange.lowerBound(this.keyForBatchId(batchId + 1));
let foundBatch: MutationBatch | null = null;
return mutationsStore(transaction)
.iterate({ range }, (key, dbBatch, control) => {
if (dbBatch.userId === this.userId) {
assert(
dbBatch.batchId > batchId,
'Should have found mutation after ' + batchId
);
foundBatch = this.serializer.fromDbMutationBatch(dbBatch);
}
control.done();
})
.next(() => foundBatch);
}
getAllMutationBatches(
transaction: PersistenceTransaction
): PersistencePromise<MutationBatch[]> {
const range = IDBKeyRange.bound(
this.keyForBatchId(BATCHID_UNKNOWN),
this.keyForBatchId(Number.POSITIVE_INFINITY)
);
return mutationsStore(transaction)
.loadAll(range)
.next(dbBatches =>
dbBatches.map(dbBatch => this.serializer.fromDbMutationBatch(dbBatch))
);
}
getAllMutationBatchesThroughBatchId(
transaction: PersistenceTransaction,
batchId: BatchId
): PersistencePromise<MutationBatch[]> {
const range = IDBKeyRange.bound(
this.keyForBatchId(BATCHID_UNKNOWN),
this.keyForBatchId(batchId)
);
return mutationsStore(transaction)
.loadAll(range)
.next(dbBatches =>
dbBatches.map(dbBatch => this.serializer.fromDbMutationBatch(dbBatch))
);
}
getAllMutationBatchesAffectingDocumentKey(
transaction: PersistenceTransaction,
documentKey: DocumentKey
): PersistencePromise<MutationBatch[]> {
// Scan the document-mutation index starting with a prefix starting with
// the given documentKey.
const indexPrefix = DbDocumentMutation.prefixForPath(
this.userId,
documentKey.path
);
const indexStart = IDBKeyRange.lowerBound(indexPrefix);
const results: MutationBatch[] = [];
return documentMutationsStore(transaction)
.iterate({ range: indexStart }, (indexKey, _, control) => {
const [userID, encodedPath, batchID] = indexKey;
// Only consider rows matching exactly the specific key of
// interest. Note that because we order by path first, and we
// order terminators before path separators, we'll encounter all
// the index rows for documentKey contiguously. In particular, all
// the rows for documentKey will occur before any rows for
// documents nested in a subcollection beneath documentKey so we
// can stop as soon as we hit any such row.
const path = EncodedResourcePath.decode(encodedPath);
if (userID !== this.userId || !documentKey.path.isEqual(path)) {
control.done();
return;
}
const mutationKey = this.keyForBatchId(batchID);
// Look up the mutation batch in the store.
// PORTING NOTE: because iteration is callback driven in the web,
// we just look up the key instead of keeping an open iterator
// like iOS.
return mutationsStore(transaction)
.get(mutationKey)
.next(dbBatch => {
Iif (dbBatch === null) {
fail(
'Dangling document-mutation reference found: ' +
indexKey +
' which points to ' +
mutationKey
);
}
results.push(this.serializer.fromDbMutationBatch(dbBatch!));
});
})
.next(() => results);
}
getAllMutationBatchesAffectingQuery(
transaction: PersistenceTransaction,
query: Query
): PersistencePromise<MutationBatch[]> {
assert(
!query.isDocumentQuery(),
"Document queries shouldn't go down this path"
);
const queryPath = query.path;
const immediateChildrenLength = queryPath.length + 1;
// TODO(mcg): Actually implement a single-collection query
//
// This is actually executing an ancestor query, traversing the whole
// subtree below the collection which can be horrifically inefficient for
// some structures. The right way to solve this is to implement the full
// value index, but that's not in the cards in the near future so this is
// the best we can do for the moment.
//
// Since we don't yet index the actual properties in the mutations, our
// current approach is to just return all mutation batches that affect
// documents in the collection being queried.
const indexPrefix = DbDocumentMutation.prefixForPath(
this.userId,
queryPath
);
const indexStart = IDBKeyRange.lowerBound(indexPrefix);
// Collect up unique batchIDs encountered during a scan of the index. Use a
// SortedSet to accumulate batch IDs so they can be traversed in order in a
// scan of the main table.
let uniqueBatchIDs = new SortedSet<BatchId>(primitiveComparator);
return documentMutationsStore(transaction)
.iterate({ range: indexStart }, (indexKey, _, control) => {
const [userID, encodedPath, batchID] = indexKey;
const path = EncodedResourcePath.decode(encodedPath);
if (userID !== this.userId || !queryPath.isPrefixOf(path)) {
control.done();
return;
}
// Rows with document keys more than one segment longer than the
// query path can't be matches. For example, a query on 'rooms'
// can't match the document /rooms/abc/messages/xyx.
// TODO(mcg): we'll need a different scanner when we implement
// ancestor queries.
if (path.length !== immediateChildrenLength) {
return;
}
uniqueBatchIDs = uniqueBatchIDs.add(batchID);
})
.next(() => {
const results: MutationBatch[] = [];
const promises: Array<PersistencePromise<void>> = [];
// TODO(rockwood): Implement this using iterate.
uniqueBatchIDs.forEach(batchID => {
const mutationKey = this.keyForBatchId(batchID);
promises.push(
mutationsStore(transaction)
.get(mutationKey)
.next(mutation => {
Iif (mutation === null) {
fail(
'Dangling document-mutation reference found, ' +
'which points to ' +
mutationKey
);
}
results.push(this.serializer.fromDbMutationBatch(mutation!));
})
);
});
return PersistencePromise.waitFor(promises).next(() => results);
});
}
removeMutationBatches(
transaction: PersistenceTransaction,
batches: MutationBatch[]
): PersistencePromise<void> {
const txn = mutationsStore(transaction);
const indexTxn = documentMutationsStore(transaction);
const promises: Array<PersistencePromise<void>> = [];
for (const batch of batches) {
const range = IDBKeyRange.only(this.keyForBatchId(batch.batchId));
let numDeleted = 0;
const removePromise = txn.iterate({ range }, (key, value, control) => {
numDeleted++;
return control.delete();
});
promises.push(
removePromise.next(() => {
assert(
numDeleted === 1,
'Dangling document-mutation reference found: Missing batch ' +
batch.batchId
);
})
);
for (const mutation of batch.mutations) {
const indexKey = DbDocumentMutation.key(
this.userId,
mutation.key.path,
batch.batchId
);
promises.push(indexTxn.delete(indexKey));
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(mutation.key);
}
}
}
return PersistencePromise.waitFor(promises);
}
performConsistencyCheck(
txn: PersistenceTransaction
): PersistencePromise<void> {
return this.checkEmpty(txn).next(empty => {
if (!empty) {
return PersistencePromise.resolve();
}
// Verify that there are no entries in the documentMutations index if
// the queue is empty.
const startRange = IDBKeyRange.lowerBound(
DbDocumentMutation.prefixForUser(this.userId)
);
const danglingMutationReferences: ResourcePath[] = [];
return documentMutationsStore(txn)
.iterate({ range: startRange }, (key, _, control) => {
const userID = key[0];
Eif (userID !== this.userId) {
control.done();
return;
} else {
const path = EncodedResourcePath.decode(key[1]);
danglingMutationReferences.push(path);
}
})
.next(() => {
assert(
danglingMutationReferences.length === 0,
'Document leak -- detected dangling mutation references when queue is empty. Dangling keys: ' +
danglingMutationReferences.map(p => p.canonicalString())
);
});
});
}
setGarbageCollector(gc: GarbageCollector | null): void {
this.garbageCollector = gc;
}
containsKey(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<boolean> {
const indexKey = DbDocumentMutation.prefixForPath(this.userId, key.path);
const encodedPath = indexKey[1];
const startRange = IDBKeyRange.lowerBound(indexKey);
let containsKey = false;
return documentMutationsStore(txn)
.iterate({ range: startRange, keysOnly: true }, (key, value, control) => {
const [userID, keyPath, /*batchID*/ _] = key;
if (userID === this.userId && keyPath === encodedPath) {
containsKey = true;
}
control.done();
})
.next(() => containsKey);
}
/**
* Creates a [userId, batchId] key for use with the DbMutationQueue object
* store.
*/
private keyForBatchId(batchId: BatchId): DbMutationBatchKey {
return [this.userId, batchId];
}
}
function validateStreamToken(token: ProtoByteString): string {
assert(
typeof token === 'string',
'Persisting non-string stream token not supported.'
);
return token as string;
}
/**
* Helper to get a typed SimpleDbStore for the mutations object store.
*/
function mutationsStore(
txn: PersistenceTransaction
): SimpleDbStore<DbMutationBatchKey, DbMutationBatch> {
return getStore<DbMutationBatchKey, DbMutationBatch>(
txn,
DbMutationBatch.store
);
}
/**
* Helper to get a typed SimpleDbStore for the mutationQueues object store.
*/
function documentMutationsStore(
txn: PersistenceTransaction
): SimpleDbStore<DbDocumentMutationKey, DbDocumentMutation> {
return getStore<DbDocumentMutationKey, DbDocumentMutation>(
txn,
DbDocumentMutation.store
);
}
/**
* Helper to get a typed SimpleDbStore for the mutationQueues object store.
*/
function mutationQueuesStore(
txn: PersistenceTransaction
): SimpleDbStore<DbMutationQueueKey, DbMutationQueue> {
return getStore<DbMutationQueueKey, DbMutationQueue>(
txn,
DbMutationQueue.store
);
}
/**
* Helper to get a typed SimpleDbStore from a transaction.
*/
function getStore<KeyType extends IDBValidKey, ValueType>(
txn: PersistenceTransaction,
store: string
): SimpleDbStore<KeyType, ValueType> {
Eif (txn instanceof SimpleDbTransaction) {
return txn.store<KeyType, ValueType>(store);
} else {
return fail('Invalid transaction object provided!');
}
}
|